#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This script generates roles based on what Apache Beam uses in GCP.
# The roles are defined in a YAML file.

import yaml
import datetime
import os
from google.cloud import iam_admin_v1
from google.api_core import exceptions

# Permissions cache to avoid repeated API calls.
permissions_cache = {}

ASF_LICENSE_HEADER = """# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the \"License\"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an \"AS IS\" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This file is auto-generated by generate_roles.py.
# Do not edit manually.
\n"""

def get_permission_stage(permission_name: str, project_id: str) -> str:
    """
    Finds the support level of a specific IAM permission for a given project. This function caches the results to avoid repeated API calls.

    Args:
        permission_name: The name of the permission to check, e.g., 'storage.buckets.create'.
        project_id: The ID of the GCP project to check against.
    Returns:
        The support level of the permission as a string, or "" if the permission is not found.
    """
    global permissions_cache

    try:
        if f"{project_id}-stage" in permissions_cache:
            return permissions_cache[f"{project_id}-stage"].get(permission_name, "")
        else:
            permissions_cache[f"{project_id}-stage"] = {}

        client = iam_admin_v1.IAMClient()
        resource = f"//cloudresourcemanager.googleapis.com/projects/{project_id}"

        request = iam_admin_v1.QueryTestablePermissionsRequest(
            full_resource_name=resource,
            page_size=1000
        )

        for permission in client.query_testable_permissions(request=request):
            permissions_cache[f"{project_id}-stage"][permission.name] = permission.custom_roles_support_level

        return permissions_cache[f"{project_id}-stage"].get(permission_name, "")

    except exceptions.PermissionDenied as e:
        print(f"Error: Permission denied. Ensure you have 'resourcemanager.projects.get' on project '{project_id}'.")
        print(f"Details: {e}")
        return ""
    except exceptions.NotFound as e:
        print(f"Error: Project '{project_id}' not found.")
        print(f"Details: {e}")
        return ""
    except Exception as e:
        print(f"An unexpected error occurred while fetching permissions: {e}")
        return ""

def get_role_permissions(role_name: str, project_id: str = "") -> list[str]:
    """
    Gets the permissions included in a predefined or custom IAM role, filtered to only GA permissions.

    Args:
        role_name: The full name of the role.
                   For predefined roles, e.g., 'roles/secretmanager.viewer'.
                   For custom roles, e.g., 'projects/your-project-id/roles/your-custom-role'.

        project_id: Optional, used for permission metadata lookup.
    Returns:
        A list of GA permissions associated with the role.
    """

    global permissions_cache
    print(f"Fetching permissions for role: {role_name} in project: {project_id}")

    try:
        if f"{project_id}-role" in permissions_cache and role_name in permissions_cache[f"{project_id}-role"]:
            return permissions_cache[f"{project_id}-role"].get(role_name, [])
        else:
            if f"{project_id}-role" not in permissions_cache:
                permissions_cache[f"{project_id}-role"] = {}

        client = iam_admin_v1.IAMClient()
        request = iam_admin_v1.GetRoleRequest(
            name=role_name,
        )
        role = client.get_role(request=request)
        all_perms = list(role.included_permissions)
        ga_perms = []
        for perm in all_perms:
            stage = get_permission_stage(perm, project_id)
            if stage == iam_admin_v1.Permission.CustomRolesSupportLevel.SUPPORTED:
                ga_perms.append(perm)
        
        permissions_cache[f"{project_id}-role"][role_name] = ga_perms
        return ga_perms
    except exceptions.NotFound:
        print(f"Error: The role '{role_name}' was not found.")
        return []
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return []

def filter_permissions(permissions: list[str], allowed_prefixes: list[str] = [], denied_suffixes: list[str] = []) -> set[str]:
    """
    Filters permissions based on the provided services.

    Args:
        permissions: A list of permissions to filter.
        allowed_prefixes: A list of strings that permissions must contain to be included.
        denied_suffixes: A list of strings that permissions must not contain to be included.
    Returns:
        A list of permissions that match the specified services.
    """

    filtered_permissions = set()

    for perm in permissions:
        if any(perm.startswith(prefix) for prefix in allowed_prefixes):
            if not any(perm.endswith(suffix) for suffix in denied_suffixes):
                filtered_permissions.add(perm)

    return filtered_permissions

def generate_role(role_name: str , perms: set[str]) -> dict:
    return {
        "role_id": f"{role_name}",
        "title": f"{role_name}",
        "stage": "GA",
        "description": f"This is the {role_name} role",
        "permissions": sorted(list(perms)),
    }

def write_role_yaml(filename, role_data):
    if not role_data.get("permissions"):
        print(f"No permissions to write for {filename}. Skipping.")
        return
    with open(filename, "w") as f:
        f.write(ASF_LICENSE_HEADER)
        f.write(f"# This file was generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n")
        yaml.dump(role_data, f, default_flow_style=False)

def get_config():
    """
    Reads the roles configuration from the YAML file and returns it as a dictionary.
    The configuration includes services, roles, and suffixes for filtering permissions.
    """
    script_dir = os.path.dirname(os.path.abspath(__file__))
    config_path = os.path.join(script_dir, "roles_config.yaml")
    with open(config_path, "r") as f:
        config = yaml.safe_load(f)

    # Each role inherits permissions from the previous role.
    # This means that the viewer role has all the permissions of the committer role, and so on.
    # The roles are defined in the order of viewer, committer, infra_manager, and admin.
    # The viewer role is the base role, so its file contains all its

    response = {
        "project_id": config.get("project_id", "apache-beam-testing"),
        "roles_prefix": config.get("roles_prefix", "beam"),
        "role": {}
    }

    # Add suffixes to the response
    suffixes = {}
    for suffix in config.get("suffixes", []):
        suffixes[suffix["name"]] = suffix["values"]

    services = set()
    roles = set()

    # Sort roles by hierarchy to ensure they are processed in the correct order.
    config["roles"].sort(key=lambda x: int(x.get("hierarchy", 0)))

    for role in config["roles"]:
        services.update(role.get("services", []))
        roles.update(role.get("roles", []))

        response["role"][role["name"]] = {
            "name": role["name"],
            "description": role.get("description", f"This is the {role['name']} role"),
            "services": services.copy(),
            "roles": roles.copy(),
            "except_suffixes": [],
        }

        # If the role has except_suffixes, add them to the response
        suffix_set = set()
        for except_suffix in role.get("except_suffixes", []):
            if except_suffix in suffixes:
                suffix_set.update(suffixes[except_suffix])
            else:
                raise ValueError(f"Unknown suffix '{except_suffix}' in role '{role['name']}'")
        if suffix_set:
            response["role"][role["name"]]["except_suffixes"] = list(suffix_set)

    return response

def get_roles():
    """
    Generates the roles based on the predefined services and permissions.
    This function creates roles for Beam Viewer, Committer, Infra Manager, and Admin.
    It filters permissions based on the allowed and denied strings defined in the configuration.
    """

    config = get_config()
    response = {}

    project_id = config["project_id"]

    permissions_added = set()

    for role in config["role"].values():
        print(f"Generating role: {config['roles_prefix']}_{role['name']} with services: {role['services']} and roles: {role['roles']}")
        # Get the permissions for each base role.
        role_permissions = set()
        for role_name in role["roles"]:
            role_permissions.update(get_role_permissions(role_name, project_id))
        role["permissions"] = filter_permissions(
            permissions=list(role_permissions),
            allowed_prefixes=list(role["services"]),
            denied_suffixes=role.get("except_suffixes", [])
        )
        # Remove already added permissions to avoid duplicates.
        role["permissions"] = role["permissions"].difference(permissions_added)
        permissions_added.update(role["permissions"])
        response[f"{config['roles_prefix']}_{role['name']}"] = generate_role(f"{config['roles_prefix']}_{role['name']}", role["permissions"])

    return response

def main():
    """
    Main function to generate the roles and write them to YAML files.
    It creates a directory for the roles if it doesn't exist and writes each role to its respective file.
    """

    roles = get_roles()

    for role_name, role_data in roles.items():
        filename = f"{role_name}.role.yaml"
        write_role_yaml(filename, role_data)
        print(f"Generated {filename} with {len(role_data['permissions'])} permissions.")

if __name__ == "__main__":
    main()
